梦入琼楼寒有月,行过石树冻无烟

Spring cloud Ribbon or RestTemplate and Feign

首先我们只需要知道在 Ribbon 和 Feign 或 RestTemplate 中,Ribbon 主要基于 HTTP和TCP出来进行客户端的负载均衡,他是由 Netfix 公司所开源项目 Netfix OSS 的一部分,可自动与 Netfix Server Discover 也就是 Eureka 进行交互。

Ribbon 是不可以独立部署的,Spring cloud Ribbon 是基于 Ribbon 的实现。基于了轮廓、随机等规则自动调用服务,当然除此之外还可以根据自身需要来定义均衡算法。

而 Feign 和 RestTemplate 都是用于实现服务发现, Feign 主要是声明试的 WebService 客户端,为我们提供了一个快捷、优雅的调用 HTTP API。后者则是需要服务的 IP地址等这些信息来实现各个服务之间的通信调用,区别就是 Feign 会 比 RestTemplate 简洁和优雅许多

负载均衡


在仔细讲解 Ribbon 与 Feign 或 RestTemplate 之前,他们都是涉及或实现出负载均衡这个功能,因此负载均衡在这里尤为重要。

负载均额(Load Balance)从字面意思上我们可以理解为当一个数据请求时可以分摊多个单元进行分流,通常负载均衡会分为服务器端负载均衡以及客户端负载均衡两种。

服务器端负载均衡

服务器端负载均衡主要是应对高并发和服务器端扩容的重要方法之一,负载均衡也通常讲的是服务器端负载均衡。服务器端的负载均衡主要通过在服务器与客户端之间添加负载均衡器进行实现,主要分为硬件和软件负载均衡,这里我们主要介绍软件的负载均衡。

无论是软件的负载均衡还是硬件的负载均衡均维护着一个正常服务清单,通过心跳机制来删除出现故障的服务节点,也可以通过他来恢复服务节点。

软件负载均衡主要是在普通的服务器上安装具有负载均衡的软件来实现请求的分发,进而实现负载均衡,需要注意的是服务器端的 “正常服务提供者清单” 是存储在负载均衡器中的。

客户端负载均衡

客户端负载均衡与服务器端负载均衡相差无几,他们的区别主要是客户端他本身拥有 “正常服务提供者清单”,在客户端负载均衡中。所有客户端管理都管理着一份自己需要访问的服务提供者清单,而这些清单大多数从服务中心进行获取。

Ribbon


Ribbon 是 Netfix 公司所开源项目 Netfix OSS 的一部分,主要提供一个客户端负载均衡的云库。他主要基于 HTTP 和 TCP 的客户端负载均衡组件,之后的 Spring cloud Ribbon 是一个基于 Ribbon 实现,主要特点是可以根据需要自定义负载均衡算法。

Choose Server 是负载均衡策略中 “线性轮询策略(Round Robin Ruie)” 工作流程内的第三步骤,其作用是获取服务列表中取服务进行请求,如果连续十次都没有获取到服务则报错。

从上图中我们可以看到,Ribbon 本身所维护着 正常服务提供者清单的有效性,如果通过 ChooseServer 不可用,则会重新从服务中心获取有效的服务提供者清单来进行更新。

Ribbon 接口类型

Type Type Info Bean Name Class Name
IClientConfig 用于读取配置,实现类是 DefaultClientConfigIcmpl,而默认值也是该类 ribbonClientConfig DefaultClientConfigIcmpl
IRule 负责处理负载均衡规则,实现类是 ZoneAvoidanceRule 默认通过他来选择实例,步骤是: ribbonRule ZoneAvoidanceRule
1.ServerList 获取所有可用的服务提供者列表
2.ServerListFileter 过滤一部分服务提供者地址
3.最后在剩下的地址中通过 IRule 选择一台服务器
IPing 用来筛选掉无法访问的实例 ribbonPing DummyPing
ServerList <server> 用于获取服务提供者地址列表,可以是一组固定地址也可是服务中心中定期查询服务提供者的地址列表 ribbonServerList ConfigurationBasedServerList
ServerListFileter <server> 在原始服务提供者地址列表中,通过使用一定的策略过滤一部分不符合条件的地址(当动态使用 ServerList时使用) ribbonServerListFilter ZonePrefernenceServerListFilter
ILoadBalancer 可通过负载均衡中选择一个服务器,并通过标记暂停服务的服务器,一可以获取所有已知的服务器提供者列表 ribbonLoadBalancer ZoneAwareLoadBalancer or BaseLoadBalancer
ServerListUpdater 用于ServerList的更新,当服务中心的服务提供者发生变化的时候,ServerList 会根据 PollingServerListUpdater 来实现定时更新服务提供者列表 ribbonServerListUpdate PollingServerListUpdater

负载均衡器

负载均衡器很多都是通过 Ribbon 接口类型来进行实现具体的负载均衡 Bean 如:

| Bean | Type | Class Name |
| — | — | — | — |
| LoadBalancerClient | ILoad Balancer | RibbonLoadBalancerClient |
| AbstractLoadBalancer | ILoad Balancer | AbstractLoadBalancer |

LoadBalancer

在初始化时 execute() 方法会通过ILoadBalancer 来从服务中心获取服务提供者地址列表,并每 10s 来检测一下服务的可用性。如果服务端可用性发生改变,或者数量不一致,那么 RibbonLoadBanlancer 会从注册中心更新服务提供者地址列表,之后可以根据 IRule 来进行负载均衡。

LoadBanlancer主要的职责是添加服务器、选择服务器、获取所有的服务器列表、获取可用的服务提供者列表等。

Method Name Method Info
addServers(List<Server> newServers) 向服务器初始列表中增加新的服务提供者地址列表
chooseServer(Object key) 从负载均衡器中选择一个服务器
markServerDown(Server server) 通知或标记已经暂停服务的服务器
getReachableServers() 返回获取到可用的服务提供者列表
getAllServers 获取所有看到服务提供者列表

Load Balancer 主要用于定义软件负载均衡的操作接口,一个典型的负载负载均衡实现需要一组服务器进行。通常一个方法来标记特定的服务器不循环,一个调用将现有的服务器列表中选择一个服务器来进行提供服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.netflix.loadbalancer;

import java.util.List;

/**
* 定义软件负载均衡的操作接口,一个典型的负载负载均衡实现需要一组服务器进行。通常一个方法来标记特定的服务器不循环
* 一个调用将现有的服务器列表中选择一个服务器来进行提供服务。
*
* @author stonse
*
*/
public interface ILoadBalancer {

/**
* 向服务器初始列表中增加新的服务提供者地址列表(host:port)
* @param 添加新服务提供者
*/
public void addServers(List<Server> newServers);

/**
* 从负载均衡器中选择一个服务器
*
* @param 负载均衡器将可用于确定返回那个服务器的对象,如为空则
* 负载均衡器将不会使用此参数
* @return 选择服务器
*/
public Server chooseServer(Object key);

/**
* 由负载均衡器的客户端掉应来通知某个服务器暂停服务,否则 ILoadBalancer 依然认为他可以提供服务
* 直到下一次检测的周提
*
* @param 标记已经暂停服务的服务器
*/
public void markServerDown(Server server);

/**
* @deprecated getServerList 在 2016-01-20 就被弃用,取而代之的则是 getReachableServers 以及 getAllServer API。
*
* @param 如果为 true 则只应用返回可用的服务器
*/
@Deprecated
public List<Server> getServerList(boolean availableOnly);

/**
* @return 返回获取到可用的服务提供者列表
*/
public List<Server> getReachableServers();

/**
* @return 获取所有看到服务提供者列表
*/
public List<Server> getAllServers();
}

AbstractLoadBalancer

Abstract Load Balancer 是 ILoad Balancer 的实现类,他包含了大多数负载均衡实现所需要的功能,典型的是 Load Balancer 的结构:

  1. 基于特定的标准可以分时段的服务提供者信息列表
  2. 通过规则的定义和实现负载均衡的策略类
  3. 定义并实现一种机制来确定服务提供者表单中的节点、可用性的类。

在这三类中,Abstract Load Balancer 类实现了服务提供者地址列表分组的作用,被 服务器组(ServerGroup) 来定义:

Method name Method info
ServerGroup
ALL:所有服务
STATUS_UP:正常运行的服务
STATUS_NOT_UP:下线或崩溃的服务
chooseServer
从负载均衡中选择一个服务器
getServerList(ServerGroup serverGroup)
获取负载均衡中所有服务的实例列表
getLoadBalancerStats()
从 Load Balancer 来获取每个服务的所有细节统计信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.netflix.loadbalancer;

import java.util.List;

/**
* AbstractLoadBalancer contains features required for most loadbalancing
* implementations.
*
* An anatomy of a typical LoadBalancer consists of 1. A List of Servers (nodes)
* that are potentially bucketed based on a specific criteria. 2. A Class that
* defines and implements a LoadBalacing Strategy via <code>IRule</code> 3. A
* Class that defines and implements a mechanism to determine the
* suitability/availability of the nodes/servers in the List.
*
*
* @author stonse
*
*/
public abstract class AbstractLoadBalancer implements ILoadBalancer {

public enum ServerGroup{
ALL,
STATUS_UP,
STATUS_NOT_UP
}

/**
* delegate to {@link #chooseServer(Object)} with parameter null.
*/
public Server chooseServer() {
return chooseServer(null);
}


/**
* List of servers that this Loadbalancer knows about
*
* @param serverGroup Servers grouped by status, e.g., {@link ServerGroup#STATUS_UP}
*/
public abstract List<Server> getServerList(ServerGroup serverGroup);

/**
* Obtain LoadBalancer related Statistics
*/
public abstract LoadBalancerStats getLoadBalancerStats();
}
BaseLoadBalancer

Base Load Balancer 类是 Abstract Load Balancer 的实现类或工具类,他可以用一个 List 集合(AllServerList)来保存所有服务实例,之后用另一个 List 保存(UpServerList)当前有效的服务实例:

1
2
3
4
5
6
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());
Method name Method info
IpingStrategy 用于定义检查服务的策略,将 ping 所有的服务器(如果执行的很慢可能是你有大量的服务器在此集群中)
chooseServer 调用 rule 类中的 choose 方法来选择服务器对象(如果没有则返回 null)
PingTask() 每 x 秒运行一次定时器任务,检查服务器列表中每个服务器/节点的状态(默认为 1000s)
markServerDown() 用于标注服务是否有效(mark Server Down called for server)
getReachableServer() 获取所有有效的服务实例列表
getAllServer() 拥有获取所有服务器实例列表
addServer() 向负载均衡器中添加一个新的服务实例列表

除此之外,Base Load Balancer 类中的子类 DynamicServerListLoadBalancer 以及 ZoneAwareLoadBalancer 作为 DynamicServerListLoadBalancer 的子类,都实现了了一些能力,分别为:

DynamicServerListLoadBalancer
在负载均衡器的基础上做了进一步的扩展,可以在服务实例清单在运行时的动态更新的实现,以及还提供了一个过滤器标准来过滤掉不符合所需标准的服务器。

Ribbon Ping 的实现

在上述介绍中,我们都设计到了 Ping 或 “心跳” 这个概念,负载均衡中 Ping 机制主要用于检测服务提供者的有效性。他会每隔一段时间执行 Ping 来判断服务器是否存活 ,而这些工作都将由 IPing 和他的实现类来负责(Ribbon 默认实现类是 DummyPing,但需要注意的是默认情况下不会激活 Ping 机制)。

Class Name Class Info
DummyPing 虚拟的 Ping 实现,当确定服务器活着的时候会返回 true
NoOpPing 设么都不做直接返回 true
PingConstant IPing 的实现类,他用于返回任何设置的内容,true 或 false(通常只要常量参数为 true 为服务实例存活,否则为失效)

DummyPing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.netflix.loadbalancer;

import com.netflix.client.config.IClientConfig;

/**
* Default simple implementation that marks the liveness of a Server
*
* @author stonse
*
*/
public class DummyPing extends AbstractLoadBalancerPing {

public DummyPing() {
}

public boolean isAlive(Server server) {
return true;
}

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}

NoOpPing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.netflix.loadbalancer;

/**
* No Op Ping
* @author stonse
*
*/
public class NoOpPing implements IPing {

@Override
public boolean isAlive(Server server) {
return true;
}

}

PingConstant

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.netflix.loadbalancer;

/**
* A utility Ping Implementation that returns whatever its been set to return
* (alive or dead)
* @author stonse
*
*/
public class PingConstant implements IPing {
boolean constant = true;

public void setConstant(String constantStr) {
constant = (constantStr != null) && (constantStr.toLowerCase().equals("true"));
}

public void setConstant(boolean constant) {
this.constant = constant;
}

public boolean getConstant() {
return constant;
}

public boolean isAlive(Server server) {
return constant;
}
}

负载均衡策略

AbstractLoadBalancerRule 是一个 IRule 的继承,他本身则是一个负载均衡策略的抽象类,而 IRule 主要定义了 ILoadBalancer ,其最为核心的方法为 choose(),这是用于选择服务器对象(服务实例),如过没有将会返回 null。

而定义 IRule 主要的目的第一就是为了辅助负载均衡器(ILoadBalancer来通过负载均衡策略选择合适的服务实例,其默认使用的是 九大负载均衡 策略中的 线性轮询策略(RoundRobinRule)

线性轮询策略(Round Robin Rule)

由于 Round Robin Rule 是 ILoadBalancer 默认采用的负载均衡策略,因此也和他的流程非常符合,与此同时他还是作为著名的负载均衡策略,他主要定义了 AVAILABLE_ONLY_SERVERS(仅可用服务器)ALL_SERVERS(所有服务器) 两个状态。

  1. choose(ILoadBalancer lb, Object key) 方法从负载均衡中选择一个服务器并计数,如果没有则会返回“no load balancer”
1
2
3
4
5
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
  1. incrementAndGetModulo 方法会通过serverCount 函数来请求 allServers(所有服务器),而 incrementAndGetModulo 方法则会获取下一个索引,也就是说 获取所有服务器,并从 0 的基础上 +1
1
2
3
4
5
6
7
8
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
  1. 最后通过 choose(Object key) 通过索引去服务列表获取服务,如果连续 10次没有获取到服务,则会返回: “No available alive servers after 10 tries from load balancer”。假设 可达服务器(reachableServers)服务器总数(allServers) 为 0,则会输出: “No up servers available from load balancer” 错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();

if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}

int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}

// Next.
server = null;
}

if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}

重试策略(Retry Rule)

重试策略(Retry Rule)是 Rule 的级联,他采用 Round Robin Rule的 choose()方法来获取服务器实例,最大重试次数(maxRetryMillis)500,因此他在选择实例和重试方法是:

级联(cascade)在计算机科学中指多个对象之间一对多的映射关系,可以理解为一张表 A 用于存放学生所在班级(姓名、性别、年龄)而姓名作为主键,而另一张表 B 存放着楼层住户信息(姓名、性别)他们之间通过 姓名、年龄 来作为级联

  1. 如果通过 choose() 方法获取服务器实例正常,则回答并返回数据

  2. 假设超过了 最大的重试次数(maxRetryMillis) 没有获取到 “活着的服务器”,则返回 null

    1
    2
    3
    4
    5
    if ((answer == null) || (!answer.isAlive())) {
    return null;
    } else {
    return answer;
    }
  3. 最后如果没有活着的服务回答在当前时间小于 500 ms 的情况下,则会不断的在这时间段重试。

1
2
3
4
5
6
7
if (((answer == null) || (!answer.isAlive()))
&& (System.currentTimeMillis() < deadline)) {
/* pause and retry hoping it's transient */
Thread.yield();
} else {
break;
}

加权响应时间策略(WeightedResponseTimeRule)

WeightedResponseTimeRule 是 RoundRobinRule 的延伸,对一些功能进行了扩展、可以根据服务实例的运行情况计算出服务实例的权重,之后进行服务实例的挑选。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
// get hold of the current reference in case it is changed from the other thread
List<Double> currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();

if (serverCount == 0) {
return null;
}

int serverIndex = 0;

// last one in the list is the sum of all weights
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// No server has been hit yet and total weight is not initialized
// fallback to use round robin
if (maxTotalWeight < 0.001d) {
server = super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}

server = allList.get(serverIndex);
}

if (server == null) {
/* Transient. */
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Next.
server = null;
}
return server;
}
  1. choose 方法获取服务实例时,如果服务负载均衡器和所有访问器为空,则返回 null
  2. 之后当前所有服务的索引的最后一位,则是所有权重的总和,之后生成 0~总和 的随机权重
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
        try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
double totalResponseTime = 0;
// find maximal 95% response time
for (Server server : nlb.getAllServers()) {
// this will automatically load the stats if not in cache
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// weight for each server is (sum of responseTime of all servers - responseTime)
// so that the longer the response time, the less the weight and the less likely to be chosen
Double weightSoFar = 0.0;

// create new list and hot swap the reference
List<Double> finalWeights = new ArrayList<Double>();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}

}
}

在这其中作为最终要的角色则是 DynamicServerWeightTask 方法,主要通过 totalResponseTime(总响应时间) 来计算权重,其原理就是使用 ServerStats 在负载均衡器中捕获每个服务器的各种统计信息,那么总响应时间就是通过他的 getResponseTimeAvg 方法获取处理请求的平均总时间,以毫秒为单位,这个过程主要找到最大 95% 的响应时间

这样每个服务器的权重就是 所有服务器响应时间的总和减去响应时间 (totalResponseTime - getResponseTimeAvg) 这样得出的结果就是 响应时间越长则权重越小,则选中的几率就很小

随机策略(RandomRule)

RandomRule 主要是一个在现有服务器之间随机分配流量的负载均衡,他主要的步骤是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;

while (server == null) {
if (Thread.interrupted()) {
return null;
}
List<Server> upList = lb.getReachableServers();
List<Server> allList = lb.getAllServers();

int serverCount = allList.size();
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}

int index = rand.nextInt(serverCount);
server = upList.get(index);

if (server == null) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
Thread.yield();
continue;
}

if (server.isAlive()) {
return (server);
}

// Shouldn't actually happen.. but must be transient or a bug.
server = null;
Thread.yield();
}

return server;

}
  1. 通过choose 方法获取到服务器实例,如果负载均衡器和服务器为0则返回 null
  2. 首先他会通过 upList 以及 allList() 分别获取存活服务器列表和所有服务器列表,之后通过 Random 来生成一个随机数生成器
  3. 如果服务器正常运行,则返回该服务,并对可以请求的服务器标注一个随机的值,这就让每次通过 choose 方法获取到的服务器实例都会有一个随机的标注。

客户端配置启用线性轮询策略(ClientConfigEnabledRoundRobinRule)

客户端配置启用线性轮询策略,从名字上可以看出他就是为了启用 RoundRobinRule 策略的,因此他的整个流程都是通过 RoundRobinRule 来使用负载均衡器。而 choose 方法则也是通过 RoundRobinRule 来实现的,如果没有使用他则会出现 :“This class has not been initialized with the RoundRobinRule class” 的报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {

RoundRobinRule roundRobinRule = new RoundRobinRule();

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}

@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}

@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}

}

最大空闲策略(BestAvailableRule)

该规则主要用于选择并发请求最少的服务器实例来提供服务,延伸自 ClientConfigEnabledRoundRobinRule,通过 LoadBalancerStats 来统计每个服务器的特征和信息,以此来过滤失败的服务实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List<Server> serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
for (Server server: serverList) {
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}

如果 loadBalancerStats 不为 null,则找出最小的并发连接数(minimalConcurrentConnections)来使用。假设 loadBalancerStats 为 null,则通过 ClientConfigEnabledRoundRobinRule 类中的 choose 类来使用线性轮询策略。

过滤线性轮询策略(PredicateBasedRule)

过滤线性轮询策略主要在过滤给定的服务器列表和负载均衡器后,通过内部定义的一个过滤器来筛选出服务实例清单,之后通过线性轮询方式过滤服务实例,并从清单选取一个服务实例。

是服务器过滤逻辑的基本构建块,可用于规则和服务器列表过滤器。 谓词的输入对象是PredicateKey,里面有Server和负载均衡器的key信息。
1
2
3
4
5
6
7
8
9
10
11

```java
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
AbstractServerPredicate

过滤线性轮询的主要功能主要通过 AbstractServerPredicate 进行实现,而 AbstractServerPredicatePredicate 的实现。

主要是用于确定给定输入的 true 或 false,因此也被称之为 “谓词”。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
>> 在数学逻辑中,谓词通常使用大写罗马字母表示,如P、Q、R这些,根据其变量值来可能为 “真假(true or false)” 的陈述。
>>
>> 也可以理解为一个运算符或函数(布尔值函数),根据输入来返回一个 false or true。


1. 在初始阶段,他主要会通过 ```LoadBalancerStats``` 来获取负载均衡器的统计信息
1. 如果为空则返回负载均衡器的信息。
2. 不为空的话获取负载均衡器(```ILoadBalancer```),在获取负载均衡器的统计信息,并返回设置的负载均衡器统计信息(```setLoadBalancerStats```)。
1. 如果未找到 LoadalancerStats 未找到则返回 null

```java
protected LoadBalancerStats getLBStats() {
if (lbStats != null) {
return lbStats;
} else if (rule != null) {
ILoadBalancer lb = rule.getLoadBalancer();
if (lb instanceof AbstractLoadBalancer) {
LoadBalancerStats stats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
setLoadBalancerStats(stats);
return stats;
} else {
return null;
}
} else {
return null;
}
}
  1. 之后通过 getServerOnlyPredicate 来获取布尔值函数(即谓词),并通过 getEligibleServers 筛选出合格的服务器。最后使用 chooseRandomlyAfterFiltering 方法筛选并随机选择服务器实例,最后也可以通过 chooseRoundRobinAfterFiltering 方法来创建循环选择过筛选后的服务器实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers) {
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(random.nextInt(eligible.size())));
}

/**
* Choose a server in a round robin fashion after the predicate filters a list of servers. Load balancer key
* is presumed to be null.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers) {
List<Server> eligible = getEligibleServers(servers);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size()));
}

/**
* Choose a random server after the predicate filters list of servers given list of servers and
* load balancer key.
*
*/
public Optional<Server> chooseRandomlyAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(random.nextInt(eligible.size())));
}

/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size()));
}
public static AbstractServerPredicate ofKeyPredicate(final Predicate<PredicateKey> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
return p.apply(input);
}
};
}

/**
* Create an instance from a predicate.
*/
public static AbstractServerPredicate ofServerPredicate(final Predicate<Server> p) {
return new AbstractServerPredicate() {
@Override
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP")
public boolean apply(PredicateKey input) {
return p.apply(input.getServer());
}
};
}
区域感知轮询策略(ZoneAvoidanceRule)

该策略是 过滤线性轮询策略(PredicateBasedRule) 的实现类,主要以 区域和可用性过滤服务器的规则 为基础,也通过组合过滤条件和该策略本身的过滤条件相辅相成,而 AbstractServerPredicate 则为次过滤条件。

他的过滤条件也非常的简单,首先,需要通过 randomChooseZone 来随机选择服务器,之后 selectedZone 来选择区域并返回,之后 totalServerCount 统计服务器总数,以及获取实例数(getInstanceCount)。

在配合 indexsum,其中 index 及服务器总数,每发现一个 +1。而 sum 则是实例数的总和,当服务总数小于总和时,那么将会选择该区域并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static String randomChooseZone(Map<String, ZoneSnapshot> snapshot,
Set<String> chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}

当选择完后,就是通过 availableZones 方法获取可用区域,如果可用区数量等于1,则返回可用区(availableZones),当实例数为0时,开始移出可用区。

1
2
3
if (availableZones.size() == 1) {
return availableZones;
}

移出可用区的方式有很多,其中就是 获取心跳次数除于实例总数大于等于服务无回应百分比或每台服务器负载,小于0的,也会被移出可用区。

1
2
3
4
5
6
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
}

或者说每台服务器负载(loadPerServer)减去每台服务器最大负载(maxLoadPerServer)小于 0.000001d,则会被添加至最差区域(worstZones)。

1
2
3
4
5
6
7
8
9
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
// they are the same considering double calculation
// round error
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}

也可以通过每台服务器负载(loadPerServer)大于服务器最大负载(maxLoadPerServer)的方式让其加入到最差区域中。

至于随机选择区(randomChooseZon),如果不为0,也会被移出可用区(因为避免区域被你霸占了),否则返回可用区。

1
2
3
4
5
6
7
    String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;

}

最后,当一系列的区域过滤完成后,通过线性轮询的方式从过滤结果中选出一个服务实例。

可用性过滤策略(AvailabilityFilteringRule)

该策略根据宕机或超过请求时限的活动连接来分配权重,他是 PredicateBasedRule 的延伸,其主要还是通过 AbstractServerPredicate 来实现具体的功能。

Ribbon 负载均衡策略自定义

自定义负载均衡策略需要先运行 consul 除服务消费者之外的集群,之后新建项目添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-netflix-ribbon</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
<version>3.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.netflix.ribbon/ribbon-loadbalancer -->
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-loadbalancer</artifactId>
<version>2.7.18</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
</dependencies>

application.properties

1
2
3
4
5
spring.application.name=Ribbon Rule
server.port=9003
spring.cloud.consul.host=localhost
spring.cloud.consul.discovery.service-name=service-provider # 服务提供者名字
spring.cloud.consul.discovery.register=false # 是否注册服务

Application

在启动类中通过 @LoadBalancd 以及 @Bean 来实例化 restTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;

import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

/**
* 启动类,通过 @LoadBalanced 来开启客户端的负载均衡
* @author kunlun
* @date 2021/06/22
*/
@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}


/**
* 实例化 RestTemplate
* @return RestTemplate
*/
@LoadBalanced
@Bean
RestTemplate restTemplate() {
return new RestTemplate();
}

}

RibbonConfig

在配置类中启用 Configuration 注解,并使用 RibbonClient 来配置服务提供者名称,最后通过 Bean 注解实例化负载均衡策略(RandomRule)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import com.netflix.loadbalancer.IRule;
import com.netflix.loadbalancer.RandomRule;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.netflix.ribbon.RibbonClientConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 配置类,通过 @RibbonClient 注解来配置服务名以及实例化
* @author kunlun
* @date 2021/06/29
*/
@Configuration
@RibbonClient(name = "service-provider", configuration = RibbonClientConfiguration.class)
public class RibbonConfig {

/**
* 实例化 Ribbon 的随机策略(RandomRule)
* @return RandomRule
*/
@Bean
public IRule irule() {
return new RandomRule();
}
}

TestController

测试类主要用于通过负载均衡器来使用负载均衡策略实现选择服务实例的效果:

运行后浏览器打开 http://localhost:9003/hey 每次刷新会得到不同的服务提供者实例返回的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;

@RestController
public class TestController {

@Autowired
private LoadBalancerClient loadBalancerClient;

Date date = new Date();

SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyy-MM-dd hh:mm:ss");

@GetMapping("/hey")
public String hey() {
ServiceInstance serviceInstance = loadBalancerClient.choose("service-provider");
String callService = "Host: " + serviceInstance.getHost() + " Port: " + serviceInstance.getPort() + " Date:" + simpleDateFormat.format(date);
return callService;
}

}

Feign

需要注意的是 Feign 是除了 RestTemplate 客户端实现服务发现外 ,可以实现服务发现的另一种方式,通过整合 Ribbon 好后可以提供负载均衡的功能。

Feign 与 Ribbon 最为不同的就是调用方式,通常情况下 Ribbon 需要自己构建一个 HTTP 请求,然后 RestTemplate 将该请求发送给其他服务,RibbonClient(value = "serverName") 就印证了这一点。

而 Feign 是在 Ribbon 的基础上在进行一层封装,因为是采用接口的方式他并不需要自己构建 HTTP 请求。只需要将其他服务方法定义成抽象方法即可@FeignClient("ServerName") 注解来调用服务。

属性值应与服务中心方法名一致
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

#### Feign 工作流程
上述这些与 Feugn 的工作原理密不可分,在正常的情况下他有四个步骤,分别为:
![](https://49812933408852955071488026628034-1301075051.cos.ap-nanjing.myqcloud.com/20210701034924.png)

在这个过程中,如果使用了 ```@FeignClient``` 注解,那么 Feign 客户端将会创建一个动态代理。
之后调用这个接口(即调用了 Feign 客户端所创建的动态代理),而 Feign 客户端的动态代理会根据接口上的 ```@RequestMapping``` 注解来构造出地址以及方法。
最后发起请求并解析响应

![](https://49812933408852955071488026628034-1301075051.cos.ap-nanjing.myqcloud.com/20210701030423.png)

服务提供者 **调用了** 定义 ```@FeignClient``` 注解的接口时,Feign 会构建一个动态代理,之后构造地址
最终向接口发送请求(也就是服务提供者)

#### Feign 负载均衡
![](https://49812933408852955071488026628034-1301075051.cos.ap-nanjing.myqcloud.com/20210701111804.gif)
Feign 是一个声明式的 Web Servce 客户端,Spring cloud 客户端添加了Spring MVC 的支持,Feign 在整合了 Ribbon 后即可一共负载均衡的功能,在此之前我们需要添加依赖:

```xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

以及在 application.properties 全局配置文件中配置服务端口以及名称等:

1
2
3
4
5
spring.application.name=Feign Configuration
server.port=8780
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.register=false

然后在启动类中通过 EnableDiscoveryClient 来启用服务发现,并使用 @EnableFeignClients 注解来启动 Feign 客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

/**
* 启动类,通过 @EnableDiscoveryClient 实现服务中心,以及 @EnableFeignClients 来支持 Feign
* @author kunlun
* @date 2021/7/1
*/
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class DemoApplication {

public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}

}
Config

在配置类中我们主要用于实现使用默认契约,即 Spring cloud Netfilx MVC Controller 改为 feign.Contract.Defaull 原生契约。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.example.demo.config;

import feign.Contract;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 自定义 Feign 配置
* @author kunlun
* @date 2021/7/1
*/
@Configuration
public class FeignConfig {

/**
* 将 Spring cloud MVC Netfilx Controller 改为 feign.Contract.Defaull
* 之后契约将默认改为 Feign 原生契约,之后可以使用默认注解
* @return feign.Contract.Defaull
*/
@Bean
public Contract contract() {
return new Contract.Default();
}
}

Interface

为了通过 consul 服务发现提供者,因此需要通过 FeignClient 注解来进行连接,也就是 Feign 工作流程中的 “构造地址‘,以及 Feign 的配置等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.example.demo;

import feign.RequestLine;
import org.springframework.cloud.openfeign.FeignClient;
import com.example.demo.config.FeignConfig;

/**
* FeignClient 接口
* @author kunlun
* @date 2021/7/1
*/
@FeignClient(contextId = "feignClient", name = "service-provider", configuration = FeignConfig.class)
public interface FeignClientInterface {

/**
* 通过 @RequestLine 将 Spring MVC 注解修改为原生的 @RequestLine
* @return @RequestLine
*/
@RequestLine("GET /hey")
public String hey();

}
controller

最后的 Feign 控制器则是为了提供服务,因此只需要通过调用 Feigin 接口即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.example.demo.controller;

import com.example.demo.FeignClientInterface;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 调用接口以此来提供服务
* @author kunlun
* @date 2021/7/1
*/
@RestController
public class FeignController {

@Autowired
FeignClientInterface feignClientInterface;

@GetMapping("/hey")
public String index() {
return feignClientInterface.hey();
}
}

Feign 记录日志

在 Feign 中,记录日志的等级可以分为四个,分别为:

ID DA
NONE 不记录(默认)
BASIC 只记录和请求方法、URL、相应状态码以及执行时间
HEADERS 只记录基本信息,请求和响应的标题
FULL 记录请求、响应的标题以及正文和元数据

在 Feigin 负载均衡的基础上,我们实际上只需要修改或添加 config 包下的 FeignConfig 类即可:

FeignConfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.example.demo.config;

import feign.Contract;
import feign.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 自定义 Feign 配置
* @author kunlun
* @date 2021/7/1
*/
@Configuration
public class FeignConfig {

/**
* 将 Spring cloud MVC Controller 改为 feign.Contract.Defaull
* 之后契约将默认改为 Feign 原生契约,之后可以使用默认注解
* @return feign.Contract.Defaull
*/
@Bean
public Contract contract() {
return new Contract.Default();
}

/**
* 日志管理器记录等级为 FULL
* @return Logger.Level.Full
*/
@Bean
Logger.Level level() {
return Logger.Level.FULL;
}
}

Logger 是一个日志管理器,与 level 组合为 Logger.Level 即日志管理器等级

application.properties

之后在 applicatin.properties 全局配置文件中添加 logging.level.com.example.demo=DEBUG 来开启记录日志的包:

1
2
3
4
5
6
spring.application.name=Feign Configuration
server.port=8780
spring.cloud.consul.host=localhost
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.register=false
logging.level.com.example.demo=DEBUG
⬅️ Go back